home *** CD-ROM | disk | FTP | other *** search
/ Cream of the Crop 26 / Cream of the Crop 26.iso / os2 / pvm34b3.zip / pvm34b3 / pvm3 / src / lpvmmpp.c < prev    next >
C/C++ Source or Header  |  1997-07-22  |  18KB  |  788 lines

  1.  
  2. static char rcsid[] =
  3.     "$Id: lpvmmpp.c,v 1.10 1997/07/21 14:31:57 pvmsrc Exp $";
  4.  
  5. /*
  6.  *         PVM version 3.4:  Parallel Virtual Machine System
  7.  *               University of Tennessee, Knoxville TN.
  8.  *           Oak Ridge National Laboratory, Oak Ridge TN.
  9.  *                   Emory University, Atlanta GA.
  10.  *      Authors:  J. J. Dongarra, G. E. Fagg, M. Fischer
  11.  *          G. A. Geist, J. A. Kohl, R. J. Manchek, P. Mucci,
  12.  *         P. M. Papadopoulos, S. L. Scott, and V. S. Sunderam
  13.  *                   (C) 1997 All Rights Reserved
  14.  *
  15.  *                              NOTICE
  16.  *
  17.  * Permission to use, copy, modify, and distribute this software and
  18.  * its documentation for any purpose and without fee is hereby granted
  19.  * provided that the above copyright notice appear in all copies and
  20.  * that both the copyright notice and this permission notice appear in
  21.  * supporting documentation.
  22.  *
  23.  * Neither the Institutions (Emory University, Oak Ridge National
  24.  * Laboratory, and University of Tennessee) nor the Authors make any
  25.  * representations about the suitability of this software for any
  26.  * purpose.  This software is provided ``as is'' without express or
  27.  * implied warranty.
  28.  *
  29.  * PVM version 3 was funded in part by the U.S. Department of Energy,
  30.  * the National Science Foundation and the State of Tennessee.
  31.  */
  32.  
  33. /*
  34.  *    lpvmmpp.c
  35.  *
  36.  *    support routines for the MPP environment
  37.  *
  38.  */
  39.  
  40. #include <stdio.h>
  41. #include <rpc/types.h>
  42. #include <rpc/xdr.h>
  43. #ifdef    SYSVSTR
  44. #include <string.h>
  45. #else
  46. #include <strings.h>
  47. #endif
  48. #include <errno.h>
  49.  
  50. #include <pvm3.h>
  51. #include "global.h"
  52. #include "pvmalloc.h"
  53. #include "pvmfrag.h"
  54. #include "listmac.h"
  55. #include "bfunc.h"
  56. #include <pvmtev.h>
  57. #include <pvmproto.h>
  58. #include "tevmac.h"
  59. #include "mppmsg.h"
  60. #include "mppchunk.h"
  61. #include "lmsg.h"
  62. #include "pmsg.h"
  63. #include "pvmmimd.h"
  64. #include "global.h"
  65. #include "lpvm.h"
  66. #ifndef max
  67. #define max(a,b)    ((a)>(b)?(a):(b))
  68. #endif
  69. /***************
  70.  **  Globals  **
  71.  **           **
  72.  ***************/
  73.  
  74. /* Indicate that we are able to receive directly into a users buffer */
  75.  
  76. extern struct msgid *pvm_inprecv;        /* from lpvm.c */
  77. int pvmhostnode;
  78.  
  79. /***************
  80.  **  Private  **
  81.  **           **
  82.  ***************/
  83.  
  84. static int pvmmynode;
  85. static int pvmpartsize;
  86. static int pvmmyndf;
  87.  
  88. static int pvmtidnmask = TIDNODE;
  89. static int pvmtidpmask = TIDPTYPE;
  90. static int pvmtidhmask = TIDHOST;
  91. static int pvmmyptype;
  92.  
  93. static struct frag * pvm_frReady __ProtoGlarp__((
  94. MSG_INFO_PTR, int, MSGFUNC_PTR, MPP_DIRECTI_PTR, int, int *, struct frag ** ));
  95.  
  96. static MPP_DIRECTI_PTR find_direct __ProtoGlarp__((MPP_DIRECTI_PTR, int, int ));
  97. /* header of posted precvs */
  98.  
  99. static struct msgid *precvIds = (struct msgid *) NULL; 
  100.  
  101. static MSG_INFO_PTR nodefrags = (MSG_INFO_PTR) NULL; /* pre-posted recv bufs */
  102. static MSG_INFO_PTR pvmdfrags = (MSG_INFO_PTR) NULL; /* pre-posted recv bufs */
  103. static MPP_DIRECTI_PTR pvmddirect = (MPP_DIRECTI_PTR) NULL;
  104. static MPP_DIRECTI_PTR peerdirect = (MPP_DIRECTI_PTR) NULL;
  105.  
  106. static struct ttpcb *peerpcbs = (struct ttpcb *) NULL;
  107. static struct ttpcb *pvmdpcb = (struct ttpcb *) NULL;
  108. static struct pmsg *rxpmsgs = (struct pmsg *) NULL;
  109. static int inplaceDelay = 0;
  110.  
  111. struct pmsg *  midtobuf();
  112. char * getenv();
  113.  
  114. /**************************
  115.  **  Internal Functions  **
  116.  **                      **
  117.  **************************/
  118.  
  119.  
  120. /* ------------ pvm_mppbeatask --------- */
  121. /*    
  122. *
  123. *    Initialize libpvm, config process as a task.
  124. *    This is called as the first step of each libpvm function so no
  125. *    explicit initialization is required.
  126. *
  127. *    Returns 0 if okay, else error code.
  128. */
  129.  
  130. int
  131. pvm_mpp_beatask(mytid, myptid, 
  132.                 outtid, outctx, outtag,
  133.                 trctid, trcctx, trctag,
  134.                 udpmtu, schedtid, topvmd)
  135. int *mytid, *myptid; 
  136. int *outtid, *outctx, *outtag;
  137. int *trctid, *trcctx, *trctag;
  138. int *udpmtu, *schedtid;
  139. struct ttpcb **topvmd;
  140. {
  141.     char errtxt[64];
  142.     char *p;
  143.     char *s;
  144.  
  145.     int ac = 0;
  146.     int cc;
  147.     int hostid = 0;
  148.     int i;
  149.     int myhost;
  150.     int mynode, mysetpart;
  151.     int partsize, partid;
  152.     int pvminfo[SIZEHINFO];        /* proto, hostpart, ptid, MTU, NDF */
  153.  
  154.     msgmid_t rmid;                /* msg ID returned by imsgrecv() */
  155.  
  156.     info_t minfo[MPPINFOSIZE];    /* info that might be returned by msgdone */
  157.     
  158.     MSGFUNC_PTR mfunc;
  159.  
  160.     if (pvmmytid != -1)            /* already configured */
  161.         return 0;
  162.  
  163.     mfunc = pvm_hostmsgfunc();
  164.  
  165.     pvm_mpp_message_init(&mynode, &partsize, &pvmhostnode, &pvmmyptype);
  166.  
  167.     myhost = pvmhostnode;
  168.     
  169.     if (mynode < 0 || partsize < 0 || myhost < 0)
  170.     {
  171.         sprintf(errtxt, "mppbeatask(): bad init, node %d, part %d, host %d\n",
  172.                 mynode, partsize, myhost);
  173.         pvmlogerror(errtxt);
  174.         return PvmSysErr;
  175.     }     
  176.  
  177.     if (pvmdebmask & PDMPACKET) 
  178.     {
  179.         sprintf(errtxt," %d Posting receive for config message \n", mynode);
  180.         pvmlogerror(errtxt);
  181.     }
  182.  
  183.     /* Post receive for the configuration message */
  184.     if ((*mfunc->imsgrecv)(hostid, MPPANY, PMTCONF, (char *) pvminfo,
  185.              sizeof(pvminfo), MPPANY, (int *) NULL, &rmid) < 0) 
  186.     {
  187.         pvmlogperror("beatask() recv pvminfo");
  188.         return PvmSysErr;
  189.     }
  190.  
  191.     if (pvmdebmask & PDMPACKET) {
  192.         sprintf(errtxt," %d Receive posted for config message \n", mynode);
  193.         pvmlogerror(errtxt);
  194.     }
  195.  
  196.     if ((pvm_useruid = getuid()) == -1) {
  197.         pvmlogerror("can't getuid()\n");
  198.         return PvmSysErr;
  199.     }
  200.  
  201.     pvmmyupid = getpid();
  202.  
  203.     /*
  204.     *    initialize received-message list 
  205.     */
  206.  
  207.     rxpmsgs = TALLOC(1, struct pmsg, "pmsgs");
  208.     BZERO((char*)rxpmsgs, sizeof(struct pmsg));
  209.     rxpmsgs->m_link = rxpmsgs->m_rlink = rxpmsgs;
  210.  
  211.     if (pvmdebmask & PDMPACKET) {
  212.         sprintf(errtxt," %d Waiting  for config message \n", mynode);
  213.         pvmlogerror(errtxt);
  214.     }
  215.     while (!((*mfunc->msgdone)(0, &rmid, minfo)));
  216.  
  217.     if (pvmdebmask & PDMPACKET) {
  218.         sprintf(errtxt," %d Got config message \n", mynode);
  219.         pvmlogerror(errtxt);
  220.     }
  221.  
  222.     if (pvminfo[0] != TDPROTOCOL) {
  223.         sprintf(errtxt, "beatask() t-d protocol mismatch (%d/%d)\n",
  224.             TDPROTOCOL, pvminfo[0]);
  225.         pvmlogerror(errtxt);
  226.         return PvmSysErr;
  227.     }
  228.  
  229.     /* We now have the configuration message -- set up various parameters
  230.         based on the message */
  231.  
  232.     pvmmynode = mynode;
  233.     mysetpart = pvminfo[1];
  234.     *myptid = pvminfo[2];
  235.     *udpmtu = pvminfo[3];
  236.     pvmmyndf = pvminfo[4];
  237.     pvmpartsize = pvminfo[5];
  238.     *outtid = pvminfo[6];
  239.     *outtag = pvminfo[7];
  240.     *outctx = pvminfo[8];
  241.     *trctid = pvminfo[9];
  242.     *trctag = pvminfo[10];
  243.     *trcctx = pvminfo[11];
  244.     *mytid = mysetpart + pvmmynode;
  245.     pvmfrgsiz = pvmudpmtu;
  246.  
  247.     /* ---- set up the pre-allocated receive buffers ---- */
  248.  
  249.     pvmddirect = new_directstruct( NSBUFS, NRBUFS );
  250.     peerdirect = new_vdirectstruct( pvmpartsize, NSBUFS, NRBUFS);
  251.  
  252.     pvmdfrags = init_recv_list(NSBUFS, PMTDBASE, MAXFRAGSIZE, 0, MPPANY, 
  253.                 pvm_hostmsgfunc());
  254.     nodefrags = init_recv_list(NSBUFS, PMTPBASE, MAXFRAGSIZE, 0, MPPANY,
  255.                 pvm_nodemsgfunc()); 
  256.  
  257.     /* intialize the packet numbering for packets from daemon */
  258.     fill_directstruct (pvmddirect, NRBUFS, pvm_tidtohost(pvmmytid), 
  259.         0, PMTDBASE, 0, MPPANY);
  260.     init_chunkostruct( pvmddirect->ordering, NSBUFS);
  261.  
  262.     /* intialize the packet numbering for packets from peers */
  263.     for (i = 0; i < pvmpartsize; i ++)
  264.     {
  265.         fill_directstruct (peerdirect + i, NRBUFS, i, 
  266.             0, PMTPBASE, 0, MPPANY);
  267.         init_chunkostruct( (peerdirect+i)->ordering, NSBUFS);
  268.     }
  269.  
  270.     /* ---- Create Task, pvmd  PCBs so that they can be found easily ----- */
  271.  
  272.     peerpcbs = TALLOC(pvmpartsize + 1, struct ttpcb, "pcbs");
  273.     for (i = 0; i <= pvmpartsize; i ++)
  274.     {
  275.         BZERO((char *)(peerpcbs + i), sizeof(struct ttpcb));
  276.         (peerpcbs + i) -> tt_tid = i;
  277.         (peerpcbs + i) -> tt_state = TTOPEN;
  278.         (peerpcbs + i) -> mpdirect = peerdirect + i;
  279.     } 
  280.  
  281.     pvmdpcb = peerpcbs + pvmpartsize;    
  282.     pvmdpcb -> mpdirect = pvmddirect;
  283.  
  284.     *topvmd = pvmdpcb;
  285.     
  286.     precvIds = msgid_new();
  287.     precvIds->ms_link = precvIds->ms_rlink = precvIds;
  288.  
  289.     if (s = getenv("PVMINPLACEDELAY"))
  290.     {
  291.         inplaceDelay = atoi(s);
  292.         pvmlogprintf("setting in place delay to %d \n", inplaceDelay);
  293.     }
  294.     else
  295. #if !defined(IMA_PGON)
  296.         inplaceDelay = 0;
  297. #else
  298.         inplaceDelay = 250;                /* uSec. Seems to work well */
  299. #endif
  300.  
  301.     pvm_setopt(PvmRoute, PvmDontRoute); /* Deny direct routing */
  302.     return 0;
  303.  
  304.  
  305.     
  306. }
  307.  
  308. /* ------ pvm_node_send ------- */
  309. /* this routine sends a single array of data (a pvm fragment) to a process.
  310.  * The semantics are such that the send is asynchronous with the send 
  311.  * message id added to the sendmsg list.
  312.  *
  313.  * inputs:
  314.  *     cp         - array of data to be sent
  315.  *  len     - length  of data
  316.  *  ttpcbp    - pointer to the task control structure
  317.  *  sendmsg - list of sendmsgs to add this one to (or merge with for PGONs)
  318.  *    inPlaceHeader - if this is an inplace header, this is == cp
  319.  *    inPlaceBodyLen - length of the inplace body that will follow this header
  320.  *
  321.  *    outputs:
  322.  *       returns #bytes queued for sending 
  323.  *      sendmsg is initialized if *sendmsg == NULL, otherwise the message is 
  324.  *                added to the list of outstanding send messages    
  325. */
  326. int
  327. pvm_node_send(cp, len, ttpcbp, smsglist, inPlaceHeader, inPlaceBodyLen)
  328.     char *cp;                /* this is what we are supposed to be sending */
  329.     int len;                /* this is the length of the frag */
  330.     struct ttpcb * ttpcbp;    /* info about where this is going */
  331.     struct msgid **smsglist; /* msgid for this send */
  332.     char *inPlaceHeader;    /* This is an inplace header? */
  333.     int inPlaceBodyLen;        /* Length of inplace fragment */
  334. {
  335.     int cc;
  336.     int dtid;
  337.     int mask;                        /* used to mask different parts of tid */
  338.     msgmid_t mid;
  339.     int node;                        /* destination node */
  340.     int partid;                        /* destination partition id */
  341.     int tag;
  342.  
  343.  
  344.     MSGFUNC_PTR    mfunc;    
  345.  
  346. #if defined(IMA_PGON)
  347.     double dclock();
  348. #endif
  349.     static double lastClock;
  350.     double delay;
  351.  
  352.     static int savelen = -1;/* save the length of inplace body for next call */
  353.     
  354.     MPP_DIRECTI_PTR tdirect;
  355.  
  356.     struct msgid *cmsgid = (struct msgid *) NULL;
  357.  
  358.     mask = TIDHOST;
  359.  
  360.     dtid = ttpcbp -> tt_tid;
  361.  
  362.     if (TIDISNODE(dtid) && (dtid & pvmtidhmask) == (pvmmytid & pvmtidhmask)
  363.         && (dtid & pvmtidpmask) == (pvmmytid & pvmtidpmask)) 
  364.     {
  365.         node = dtid & pvmtidnmask;
  366.         partid = pvmmyptype;            /* send to node directly */
  367.         mfunc = pvm_nodemsgfunc();        /* point to node-node routines */
  368.     } 
  369.     else 
  370.     {
  371.         node = pvmhostnode;
  372.         partid = PVMDPTYPE;            /* send to pvmd first */
  373.         mfunc = pvm_hostmsgfunc();        /* point to host-node routines */
  374.     }
  375.  
  376.     
  377.     tdirect = ttpcbp->mpdirect;
  378.  
  379.     if (inPlaceHeader)
  380.     {
  381.         savelen = inPlaceBodyLen;
  382. #if defined(IMA_PGON)
  383.         if (savelen > 4096 )
  384.         {
  385.             lastClock = dclock();    
  386.             delay = (double) inplaceDelay / 1e6;
  387.         }
  388. #endif
  389.     }
  390.  
  391.     if ( savelen >= 0  && inPlaceHeader == (char *) NULL) 
  392.     {
  393.         tag = pvmmynode;        /* send inplace body with my physical node # */
  394. #if defined(IMA_PGON)
  395.         if (savelen > 4096 )
  396.             while ((dclock() - lastClock) < delay); 
  397. #endif
  398.         savelen = -1;              /* as the tag */
  399.     }
  400.     else
  401.     {
  402.         tag = tdirect->tagbase + tdirect->sseq;
  403.  
  404.         if (++(tdirect->sseq) >= tdirect->nbufs) 
  405.                 tdirect->sseq = 0;
  406.     }
  407.  
  408.     {
  409.         if ((cc = (*mfunc->imsgsend)(0,tag, cp, len, node, partid, &mid))
  410.                 && (cc < 0)) 
  411.         {
  412.             pvmlogperror("node_send() IMSGSEND \n ");
  413.             return PvmSysErr;
  414.         }
  415.  
  416.         /* can't merge id's  add to the list */
  417.         if (!(mfunc->msgmerge)  || !(*smsglist)) 
  418.         {
  419.             if ( (cmsgid = msgid_new()) == (struct msgid *) NULL)
  420.             {
  421.                 pvmlogerror("node_send(): couldn't allocate send msgid\n");
  422.                 return PvmSysErr;
  423.             }
  424.             if (! (*smsglist))      /* intialize the list */
  425.             {
  426.                 *smsglist = cmsgid;
  427.                 cmsgid->ms_link = cmsgid->ms_rlink = cmsgid;
  428.             }
  429.             else                /* put at the end of the list of message id's */
  430.                 LISTPUTBEFORE(*smsglist, cmsgid, ms_link, ms_rlink);
  431.         }
  432.         else
  433.         {
  434.             cmsgid = *smsglist;
  435.             cmsgid -> id = (*mfunc->msgmerge)(&(cmsgid->id), &mid);
  436.         }
  437.  
  438.         if (cmsgid -> otid != node)
  439.         {
  440.             cmsgid -> id = mid;
  441.             cmsgid -> otid = node;
  442.             cmsgid -> complete = 0;
  443.             cmsgid -> len += len;
  444.             cmsgid -> mfunc = mfunc;
  445.         }
  446.     
  447.     }
  448.     return len;
  449. }
  450.  
  451.  
  452. /* ---------- pvm_node_mcast --------- */
  453. /* This multicasts to tasks - it works takes an array of tids
  454.  * and sends directly to tasks that are on the local partition using
  455.  * pvm_send.
  456.  * It returns a list of off-host tids and the number of off-host
  457.  * tids. The off-host tids should then be sent to using pvm_mcast()
  458.  * semantics.
  459. */
  460. int
  461. pvm_node_mcast(tids, count, code, offhtids, offhcnt)
  462. int *tids;     /* array of tids, on and off host */
  463. int count;    /* #of tids */
  464. int code;    /* msgtag tag */
  465. int **offhtids; /* array of tids going off host, returns null if none */
  466. int *offhcnt; /* # of tids going off host, returns 0 if none */
  467. {
  468.  
  469.     int i;
  470.     int mask = pvmtidhmask;        /* host */
  471.     int nlocal = 0;
  472.     int localidx = count;
  473.     int nremote = 0;
  474.     int *tmptids = (int *) NULL;
  475.     *offhtids = (int *) NULL;
  476.     *offhcnt = 0;    
  477.  
  478.     if (!tids || count < 0)         /* no tids, or bogus count */
  479.         return PvmOk;
  480.  
  481.     if (! (tmptids = TALLOC(count, int, "nmcast")))
  482.     {
  483.         pvmlogerror("node_mcast(): couldn't alloc memory\n");
  484.         return PvmNoMem;
  485.     }
  486.  
  487.     for (i = 0; i < count; i++)
  488.     {
  489.         if ( TIDISNODE(tids[i]) && (tids[i] & mask) == (pvmmytid & mask))
  490.         {
  491.             /* put the local tids starting at the back of the array */
  492.             tmptids[--localidx] = tids[i];    /* tid local to us */
  493.             nlocal ++;
  494.         }
  495.         else
  496.         {
  497.             /* put non-local tids at the front of the array */
  498.             tmptids[nremote++] = tids[i];
  499.         }
  500.  
  501.     }
  502.  
  503.     /* multicast to the local tasks */
  504.     for (i = 0; i < nlocal; i++)
  505.         pvm_send(tmptids[localidx++], code);      /* loop of sends for now */
  506.  
  507.     if (nlocal == count)
  508.     {
  509.         PVM_FREE(tmptids);
  510.         tmptids = (int *) NULL;
  511.     }
  512.     else
  513.     {
  514.         *offhtids = tmptids;
  515.         *offhcnt = nremote;    
  516.     }
  517.     
  518.     return PvmOk;    
  519. }
  520.  
  521.  
  522.     
  523.     
  524.  
  525.  
  526. int
  527. mpp_pvmendtask()
  528. {
  529.  
  530.     if (pvmmytid != -1) 
  531.     {
  532.         pvmmytid = -1;
  533.  
  534.         pvm_mpp_message_stop();
  535.     }
  536.     
  537.     return 0;
  538. }
  539.  
  540.  
  541. /* mpp-specific stuff on precv */
  542. int
  543. pvm_mppprecv(tid, tag, cp ,len, dt, rtid, rtag, rlen)
  544.     int tid;
  545.     int tag;
  546.     void *cp;
  547.     int len;
  548.     int dt;
  549.     int *rtid;
  550.     int *rtag;
  551.     int *rlen;
  552. {
  553.     char errtxt[64];
  554.  
  555.     int rbf;
  556.     int cc = 0;
  557.     int l;
  558.     int x;
  559.     long ad;
  560.  
  561.     static int first = 1;
  562.     static struct msgid * thisprecv;
  563.  
  564.     /* allocate a new message id for this precv */
  565.  
  566.     if (first)
  567.     {
  568.         if ( !(thisprecv = msgid_new()))
  569.         {
  570.             pvmlogerror("mppprecv(): can't get memory \n");
  571.             return PvmNoMem;
  572.         }
  573.         first = 0;
  574.     }
  575.      
  576.  
  577.     thisprecv -> otid =  tid;
  578.     thisprecv -> tag = tag;
  579.     thisprecv -> ctxt = pvm_getcontext();
  580.     thisprecv -> ubuf = cp;
  581.     thisprecv -> len = len; 
  582.     thisprecv -> complete = 0;
  583.  
  584.     pvm_inprecv = thisprecv;
  585.     
  586.     if (!cc) {
  587.     
  588.         cc = pvm_recv(tid, tag);
  589.  
  590.         if (cc >= 0) {
  591.  
  592.             if (!(thisprecv->complete)) /* receive went into pvm buffers */
  593.             {        
  594.                 pvm_bufinfo(cc, &l, rtag, rtid);
  595.                 if     (rlen)
  596.                     *rlen = l;
  597.                 if (l < len)
  598.                     len = l;
  599.                 if (pvmdebmask & PDMMESSAGE) {
  600.                     sprintf(errtxt, "precv() unpacking message len = %d\n", l);
  601.                     pvmlogerror(errtxt);
  602.                 }
  603.                 pvm_upkbyte((char *)cp, len, 1);
  604.             }
  605.             else /* receive went straight into user memory */
  606.             {
  607.                 if(rlen)
  608.                     *rlen = thisprecv -> len;
  609.                 if (pvmdebmask & PDMMESSAGE) {
  610.                     sprintf(errtxt, "precv() short ckted into user buf len = %d\n", l);
  611.                     pvmlogerror(errtxt);
  612.                 }
  613.             }
  614.             cc = 0;
  615.         }
  616.     }
  617.  
  618.     pvm_inprecv = (struct msgid *) NULL;
  619.  
  620.     if (cc < 0)
  621.         lpvmerr("pvm_mppprecv", cc);
  622.     return cc;
  623. }
  624.  
  625. /* ---------- pvm_mpppsend ------------- */
  626. /* mpp_psend does short cuts on buffer management, 
  627.     o This is a reasonable hack that greatly decreases (overhead)
  628.       latency.
  629.     o It smacks the message structure directly. 
  630. */
  631. int 
  632. pvm_mpppsend(cp, len, tid, tag)
  633. char *cp;
  634. int len;
  635. int tid;
  636. int tag;
  637. {
  638.  
  639.     static char nullusrmsg;
  640.     static int first = 1;
  641.     static int psbuf;
  642.     static struct frag *dfrag;
  643.     static struct pmsg *psmsg;
  644.     static struct timeval ztv = { 0, 0 };
  645.     int savebuf;
  646.     int cc;
  647.  
  648.     /* First call to psend will create a regular pvm buffer for packing.
  649.      *    o    A new message is created as a by-product of pvm_mkbuf.
  650.      *     o    A single (inplace) data buffer is created and put in the frag chain 
  651.      *    o    Later calls will simply twiddle the message buffer and the frag
  652.      */
  653.     if (first)
  654.     {
  655.         psbuf = pvm_mkbuf(PvmDataInPlace); /* Make an inplace data buffer */
  656.         savebuf = pvm_setsbuf(psbuf);
  657.         pvm_pkint(&cc, 1, 1);            /* create a single frag for msg */ 
  658.         pvm_setsbuf(savebuf);
  659.         psmsg = midtobuf(psbuf);        /* save a pointer to the msg struct */
  660.         dfrag = psmsg->m_frag->fr_link;
  661.         first = 0;
  662.     }
  663.  
  664.     psmsg->m_ctx = pvmmyctx;            /* set the context */
  665.  
  666.     if (len)
  667.     {
  668.         dfrag->fr_buf = dfrag->fr_dat = cp;
  669.         dfrag->fr_max = dfrag->fr_len = len;
  670.     }
  671.     else
  672.     {
  673.         dfrag->fr_buf = dfrag->fr_dat = &nullusrmsg;
  674.         dfrag->fr_max = dfrag->fr_len = len;
  675.     }
  676.  
  677.     cc = mroute(psbuf, tid, tag, &ztv); /* route the message */ 
  678.     
  679.     return cc;
  680. }
  681.  
  682.     
  683.  
  684.     
  685.  
  686.  
  687.  
  688. /* ----------- pvm_readfrompvmd ----------- */
  689. struct frag *
  690. pvm_readfrompvmd()
  691. {
  692.     static int hostbuf = 0;                 /* buffer we are working on */
  693.  
  694.     static CHUNK_PTR readyFrags = (CHUNK_PTR) NULL;
  695.  
  696.     int mxbufs = NRBUFS;
  697.  
  698.     MSGFUNC_PTR mfunc;
  699.  
  700.     mfunc = pvm_hostmsgfunc();
  701.  
  702.     /* check to see if any fragments are ready to be processed */
  703.  
  704.     return  (struct frag *) pvm_chunkReady( pvmdfrags, mxbufs, mfunc, 
  705.         pvmddirect, 1, &hostbuf, &readyFrags);   
  706. }
  707.  
  708. /* ----------- pvm_readfrompeer ----------- */
  709. struct frag *
  710. pvm_readfrompeer()
  711. {
  712.     static int nodebuf = 0;                 /* buffer we are working on */
  713.  
  714.     static CHUNK_PTR readyFrags = (CHUNK_PTR) NULL;
  715.  
  716.     int mxbufs = NRBUFS;
  717.  
  718.     MSGFUNC_PTR mfunc;
  719.  
  720.     mfunc = pvm_nodemsgfunc();
  721.  
  722.     /* check to see if any fragments are ready to be processed */
  723.  
  724.     return  (struct frag *) pvm_chunkReady( nodefrags, mxbufs, mfunc, 
  725.             peerdirect, pvmpartsize, &nodebuf, &readyFrags);   
  726. }
  727.  
  728.  
  729. /* ------- init_precvMsgs ----- */
  730. struct msgid *
  731. init_precvMsgs()
  732. {
  733.     return msgid_new();
  734. }
  735.  
  736. /* ---------- mpp_ttpcb_find() ------- */
  737. struct ttpcb *
  738. mpp_ttpcb_find(dtid) 
  739. int dtid;
  740. {
  741.     int i;
  742.  
  743.     if (TIDISNODE(dtid) && (dtid & pvmtidhmask) == (pvmmytid & pvmtidhmask)
  744.         && (dtid & pvmtidpmask) == (pvmmytid & pvmtidpmask)) 
  745.     {
  746.         i = dtid & pvmtidnmask;
  747.         (peerpcbs + i)->tt_tid = dtid;
  748.         return peerpcbs + i;
  749.     }
  750.     else
  751.         return (struct ttpcb *) NULL;
  752. }
  753.  
  754.  
  755. /* ---------- pvm_mpp_pmsgs() ------- */
  756. struct pmsg *
  757. pvm_mpp_pmsgs()
  758. {
  759.     return rxpmsgs;
  760. }
  761.  
  762. /* ----------- find_direct ---------- */
  763. /* this is a hack to find the correct ordering structure for
  764.     a node. 
  765. */
  766. MPP_DIRECTI_PTR 
  767. pvm_find_direct (dlist, nstruct, node)
  768. MPP_DIRECTI_PTR dlist;
  769. int nstruct;
  770. int node;
  771. {
  772.     node = node & pvmtidnmask;  /* make sure this is a node */
  773.  
  774.     if (dlist == pvmddirect)
  775.         return pvmddirect;
  776.     else 
  777.     {
  778.         return dlist + node;
  779.     }    
  780. }
  781.          
  782. struct msgid *
  783. pvm_mpp_get_precvids()
  784. {
  785.         return pvm_inprecv; 
  786. }
  787.  
  788.